其他
Structured Streaming | Apache Spark中处理实时数据的声明式API
引言
第一,不同于要求用户构造物理执行计划的API,Structured Streaming是一个基于静态关系查询(使用SQL或DataFrames表示)的完全自动递增的声明性API。
第二,Structured Streaming旨在支持端到端实时的应用,将流处理与批处理以及交互式分析结合起来。
我们发现,在实践中这种结合通常是关键的挑战。Structured Streaming的性能是Apache Flink的2倍,是Apacha Kafka 的90倍,这源于它使用的是Spark SQL的代码生成引擎。它也提供了丰富的操作特性,如回滚、代码更新、混合流\批处理执行。
我们通过实际数据库上百个生产部署的案例来描述系统的设计和使用,其中最大的每个月处理超过1PB的数据。
1.介绍
尽管在过去的几年中分布式流技术取得了巨大的进步,但在实际生产中使用它们还是有不小的挑战。我们从描述这些挑战开始,基于我们在Spark Streaming上的经验,这是最早期的流处理引擎,它提供了高度抽象和函数式的API。
我们发现使用中频繁的出现两种挑战:
第一,流处理系统时常要求用户考虑复杂的物理执行概念,例如at-least-once delivery,状态存储和触发模式,这些都是流处理系统独有的挑战。
第二,许多系统只关注流式计算,但是实际用例中,流通常是大型业务应用的一部分,它包含批处理,会和静态数据进行连接,且会进行交互式查询。集成这些带有其他工作的流处理系统需要大量的工程工作。
基于这些挑战,我们描述结构化流为一种新的用于流处理的高度抽象的API。Structured Streaming吸取了很多流处理系统的点子,比如Google Dataflow的分离事件发生事件和触发器处理时间,使用关系型执行引擎获得更好的性能,以及提供一个综合性的API,旨在使他们更易用且整合进Apache Spark中。特别的,Structured Streaming在两点上和广泛使用的开源流数据处理API不同:
增量查询模型:
端到端应用的支持
第一,Structured Streaming重用了Spark SQL的执行引擎,包括它的optimizer和runtime code generator。这样与其他流处理系统相比,Structured Steaming具有了更高的吞吐量。(Flink的两倍,Kafka的90倍),这也让Structured Streaming从Spark SQL以后的更新中受益。引擎默认运行在microbatch的处理模式下,但是对于一些查询,它也可以使用一个低延迟的连续操作。
第二,我们发现,操作一个流处理应用是具有挑战性的,所以我们设计引擎支持对故障、代码更新已输出数据的重新计算。例如,一个常见的问题是流中心的数据导致应用程序崩溃,输出一个错误的结果,用户知道很久以后才会注意到(例如,由于错误解析字段)。在Structured Streaming中,每个应用程序维护一个write-ahead event log(WAL),使用JSON格式,管理者可以从任意点重新启动应用程序。如果应用程序由于用户定义函数中的错误而崩溃,管理员可以更新UDF并且从它停止的地方重启,这时会自动的读取WAL。如果应用程序输出了错误的数据,管理员可以手动的回滚到问题开始之前,重新计算。
我们的团队从2016年开始一直在Databricks的云服务中运行Structured Streaming,以及在内部使用它,所以我们用一些例子来总结本章。生产环境的应用程序范围包括交互式网络安全分析、自动报警增量提取以及ETL过程。最大的客户应用程序每月处理超过1PB的数据,在数百台机器上运行。在雅虎的Streaming Benchmark测试中,Structured Streaming的表现是Flink的2倍,Kafka的90倍。
2.流处理的挑战
2.1 复杂和低级的API
作为一个具体的例子,Google Dataflow有一个功能强大的API,具有丰富的事件处理选项去处理聚合、窗口化和无序数据。然而在这个模型中,用户需要指定窗口模式,触发模式以及触发细化模式。原始API要求用户编写一个物理操作视图,而不是逻辑查询,所以每个用户都需要理解增量处理的复杂性。
其他的APIs,例如Spark Streaming和Flink的DataStream API,也基于编写物理操作的DAG,且提供了复杂的选项去维护状态。此外,应用程序变得更加复杂,这放松了exactly-once语义,要求用户设计和实现一个一致性模型。为了解决这个问题,我们设计了Structured Streaming来实现简单的增量查询模型简单的表示应用程序。此外,我们发现添加可定制的有状态处理操作符仍然支持高级用户构建自己的处理逻辑,比如基于会话的定制、窗口(这些操作符同样可以在批任务中工作)。
2.2 集成到端到端应用程序
(1)应用程序的业务目的可能是对最新数据进行交互式查询。在本例中,一个流处理任务更新RDBMS或者Hive中的汇总表。重要的是,当流作业在更新结果的过程中,它是原子的,用户不要看到部分结果。这对于基于文件的大数据系统比如Hive来说是困难的,Hive中的表被分割到不同的文件,甚至并行的加载到数据仓库。
(2)在ETL作业中可能需要加入从另一个存储系统加载静态数据的流或使用批处理计算进行转换。这种情况下,两者间的一致性就变得异常重要(如果静态数据被更新怎么办?),在同一个API中编写整个计算是很有用的。
(3)一个团队可能偶尔需要用批处理方式运行它的流处理业务逻辑,例如:在旧数据上填充结果或者测试代码的其他版本。用其他系统重写代码既费时又容易出错。
我们通过Structured Streaming来解决这个挑战,它与Spark批处理和交互API紧密结合。
2.3 业务挑战
(1)失败:这是研究中最受关注的问题。除了单节点故障外,系统还需要支持整个应用程序的优雅关闭和重启,例如,操作人员将其迁移到一个新的集群。
(2)代码更新:应用程序很少是完美的,所以开发者需要更新他们的代码。更新之后,他们可能想要应用程序在停止的地方重新启动,或者重新计算由于错误而导致的错误结果。流处理系统的状态管理需要同时支持者两者,且要实现故障恢复机制,系统还应支持运行时更新。
(3)重新调节:随着时间推移,应用程序的负载会发生变化,长期来看,负载会不断增大,所以用户可能希望动态的对其进行缩放,特别是在云中。
(4)落单节点:系统中的节点可能会因为软件或硬件问题而慢下来,这会拖慢整个应用程序的吞吐量,系统应该自动处理这种情况。
(5)监控:流处理系统需要让用户看到系统的负载、状态以及其他的指标。
2.4 性能挑战
到目前为止,我们以吞吐量为主要性能度量,因为我们发现在大规模的流应用程序中,吞吐量通常是最重要的度量。需要分布式流处理系统的应用程序通常有着来自外部数据源的大量数据(例如移动设备、传感器或物联网),数据可能在到达系统时已经产生了延迟。这就是为什么事件时间处理是这些系统中的重要特性。相比之下,延迟敏感的应用程序,如高频交易或物理系统控制循环通常运行在单个放大器上,甚至是定制硬件如ASIC和FPGA上。然而,我们也设计Structured Streaming支持在延迟优化的引擎上执行,并实现了任务的连续处理模式,这些将在第6.3节中进行描述。这与Spark Streaming相比是一个很大的不同。
3 Structured Streaming概述
输入和输出
(1)输入sources必须可重读,当一个节点崩溃的时候允许系统重新读取最近的输入数据。实践中,组织需要使用可靠的消息总线,比如Kinesis或Kafka,或者一个持久的文件系统。
(2)输出sinks必须支持幂等写操作,确保在节点失败时进行可靠的恢复。Structured Streaming对特定的sinks支持原子输出,作业输出的更新呈现原子性,即使它是由多个并行工作的节点输出的。
除了外部系统,Structured Streaming还支持Spark SQL表的输入和输出。例如,用户可以从Spark的任意批输入源计算一个静态表并将其与流进行连接操作,或请求Structured Streaming输出一个内存中的Spark表用于交互式查询。
API
特别的,为了支持流,Structured Streaming增加了几个API功能适应现有的Spark SQL API。
(1)Triggers控制引擎计算的频率
(2)用户可以将一列标记为event time(时间戳),并设置一个watermark决定event time的过期。
(3)有状态操作符允许用户跟踪和更新可变状态,通过键来实现复杂的处理,如定制基于会话的窗口。
Execution
在这两种情况下,Structured Streaming都使用以下两种形式的持久化存储来实现容错。第一,通过WAL日志跟踪哪些数据已被处理并可靠地写入。对于一些sinks,这个日志可以与sink结合以对sink进行原子更新;第二,系统使用大规模的状态存储保存长时间运行的聚合操作的状态快照。这些都是异步写入,并且可能“落后”于最新写入的数据。系统将自动跟踪日志中最后一次更新的状态,并从此处开始重新计算状态。日志和状态存储都可以运行于可插拔存储系统(HDFS或者S3)。
操作特性
4 编程模型
4.1 简短示例
//define a DataFrame to read from static data
data = spark.read.format("json").load("/in")
//Transform it to compute a result
counts = data.groupBy($"country").count()
//write to a static data sink
counts.write.format("parquet").save("/counts")
//Define a DataFrame to read streaming data
data = spark.readStream.format("json").load("/in")
//Transform it to compute a result
counts = data.groupBy($"country").count()
//Write to a streaming data sink
counts.writeStream.format("parquet").outputMode("complete").start("/counts")
在底层,Structured Streaming将由source到sink的转换自动递增化,并以流方式执行它。引擎也将自动维护状态和检查点到外部存储-本例中,存在一个运行的计数聚合,因此引擎将跟踪每个国家的计数。
最后,API自然支持窗口和事件时间,通过Spark SQL现有的聚合操作符。例如,我们不按国家来计数,而是设置一个一小时的滑动窗口,每5分钟滑动一次,根据窗口进行计数:
//Count events by windows on the "time" field
data.groupBy(window($“time”,"1h","5min")).count()
4.2 编程模型语义
(1)每个输入源都提供一部分有序的记录。我们假设这里是部分记录是因为一些消息总线系统是并行的且不保证整个记录的顺序——例如Kafka将流分成“分区”。
(2)用户提供一个查询,在输入数据上执行,输出一个结果表(result table),这个结果表可以在任意时间的任意点输出。Structured Streaming在所有输入源中的数据前缀上运行此查询始终会产生一致的结果。也就是说,绝不会发生这样的情况,结果表中合并了一条输入的数据但没有合并在它之前的数据。此外,这些前缀将随着时间推移而增加。
(3)Triggers告诉系统何时运行新的增量计算,何时更新结果表。例如,在microbatch模式下,用户可能每分钟触发一个增量更新。
(4)sink的output mode指定了结果表如何写入到输出系统中。引擎支持以下三种不同的模式:
complete
append
update
总之,使用Structured Streaming模型,只要用户可以理解普通的Spark和DataFrame查询,即可了解结果表的内容和将要写入sink的值。用户无需担心一致性、失败或不正确的处理顺序。
最后,读者可能会注意到我们定义的一些输出模式与某些类型的查询不兼容。例如,假设我们按照国家进行聚合技术,如上一节中代码所示,我们希望使用append输出模式。系统没法保证什么时候停止接收某一特定国家的记录,所以这个查询和输出模式的组合不正确。
4.3 流中的特定操作符
4.3.1 Event time watermarks
(1)允许任意延迟的数据可能需要存储任意大的状态。例如,如果我们按照1分钟的event time窗口对数据进行计数,系统需要记录每一个1分钟的窗口计数,因为迟到的数据可能属于任意一分钟。这将迅速导致大量的状态。
(2)一些sinks不支持数据回退,这使得它能在超时后为指定的event time写出结果。例如,自定义下游应用程序希望使用“最终”结果启动工作,但是它不支持回退。append输出模式的sink也不支持回退。
Structured Streaming允许开发人员为event time列设置一个watermark,使用withWatermark操作符。这个操作符在一个给定的时间戳列C上设置一个系统的延迟阈值Tc。在任意时间,C的watermark为max(C)-Tc.请注意,这种watermark是健壮的,可以防止积压数据:如果系统在一段时间内无法跟上输入速率,则watermark不会随意的往前移动,所有在T秒内到达的时间仍会被处理。
如果watermark存在,它会影响有状态操作符忘记旧状态,Structured Streaming可以以append模式输出数据到sink。不同的输入流会有不同的watermarks。
4.3.2 Stateful Operators
mapGroupsWithState操作符,用于分组数据集,数据集中的键类型为K,值的类型为V,接收用户定义的具有以下参数的update function:
(1)key of type K
(2)newValue of type Iterator[V]
(3)state of type GroupState[S],where S is a user-specified class
//define an update function that simply tracks the
//number of events for each key as its state, returns
//that as its result, and times out keys after 30min
def updateFunc(key:Userid, newValues:Iterator[Event],
state:GroupState[Int]):Int = {
val totalEvents = state.get() + newValues.size()
state.update(totalEvents)
state.setTimeoutDuration("30 min")
return totalEvents
//Use this update function on a stream,returning a
//new table lens that contains the session lengths
lens = events.groupByKey(event => event.userId).
mapGroupsWithState(updateFunc)
最后,update函数返回用户指定的返回类型R。mapGroupsWithState的返回值是一个新表,包含了数据中每组的最终R条输出记录(当group关闭或者超时)。例如,开发人员希望使用mapGroupsWithState跟踪用户在网站上的会话,并输出为每个会话点击的页面总数。
图3展示了如何使用mapGroupsWithState跟踪用户会话,其中会话被定义为一系列事件,使用相同的用户标识,他们之间的间隔不到30分钟。我们在每个会话中输出时间的最终数量作为返回值R。然后,一个作业可以通过聚合结果表计算每个会话时间数的平均值。
另一个有状态操作符,flatMapGroupsWithState跟mapGroupsWithState十分相似,但是其更新函数每次更新时可以返回0或者更多,而不只是1。例如,这个操作符可以用来手动实现stream-to-table的join操作。这两个操作符也可以在批处理模式下工作,但是其更新函数只会被调用一次。
五.查询计划
5.1 Analysis
5.2 Incrementalization
引擎可以递增化一个受限制的、不断增长的查询。从Spark2.3.0版本开始,支持的查询包括:
-任意数量的选择,投影和select distincts。
-流和表,两个流之间的内连接、左外连接和右外连接。对一个流进行外部连接,连接条件必须包含一个watermark。
-有状态操作符比如mapGroupsWithState
-最多一个聚合(可能在复合键上)
-聚合后的排序,只能在complete输出模式下
引擎使用Catalyst转换规则将这些支持的查询映射为物理执行树,执行计算和状态管理。例如,用户查询中的一个聚合可能会映射到有状态聚合操作符,并跟踪Structured Streaming中的开放组的状态存储和输出。在内部,Structured Streaming还跟踪在增量化过程中产生的DAG重的每个物理操作符的输出模式,类似于Dataflow中的细化模式。例如,一些操作会更新已发出的记录(相当于update模式),另一些值更新发出的新记录(append模式)。至关重要的是,在Structured Streaming中,用户不必手动指定这些内部的DAG模式。
增量化是Structured Streaming研究中的一个活跃领域,但我们发现,即使是现今的很多受限的查询集也适用于很多用例。在其他情况下,用户利用Structured Streaming有状态的操作符实现自定义增量处理逻辑,以保持其选择的状态。我们希望在引擎中增加更剑仙的自动化递增技术。
5.3 Query Optimization
六.应用程序执行
6.1 状态管理和恢复
鉴于这些属性,Structured Streaming使用以下机制来进行状态跟踪,如下图所示:
(2)任何需要定期、异步检查state store中状态的操作都尽可能使用增量的检查点。它们同时存储了epoch ID和每个检查点。这些检查点不需要在每个epoch都发生或阻塞处理。
(3)输出操作将提交的epoch写入日志。Master节点在提交下一个epoch前等待所有运行操作的节点报告。根据sink的不同,如果sink支持多节点写入,Master会运行多个节点完成写入。这意味着如果流应用程序失败,只有一个epoch会被部分写入。
(4)恢复后,应用程序的新实例会查找log中最后一个未被提交到sink的epoch,其中包括其开始和结束offsets。然后使用之前epoch的offset重建应用程序内存内的状态。这只需要加载旧的状态并运行那些epoch,使用其禁用输出时相同的偏移量。最后,系统重新运行上一个epoch,依赖于sink的幂等性写出结果,然后开始新的epoch。
最后,状态管理中的所有设计对用户代码来说都是透明的。聚合操作和用户自定义状态管理操作(例如mapGroupsWithState)自动向state store中存储检查点,不需要用户自己编码实现。用户的数据类型只需要序列化即可。
6.2 微批处理模式
(1)动态负载平衡:每个操作都可以被分成很小的、独立的task在多个节点上进行调度,这样系统就可以自动平衡这些节点(如果某些节点执行速度比其他节点慢)。
(2)细粒度的故障恢复:如果节点失败,则可以仅仅执行其上的任务,而无需回滚整个集群到某检查点,这和大多数基于拓扑的系统一样。此外,丢失的任务可以并行的重新运行,这可以进一步减少恢复时间。
(3)失效节点处理:Spark将启动备份副本,就像他在批处理作业中所做的,下游任务也会使用最先完成的输出。
(4)重新调节:添加或删除节点与task一样简单,这将自动在所有可用节点上自动调度。
(5)规模和吞吐量:因为这个模式重用了Spark的批处理执行引擎,它集成了这个引擎所有的优化,比如高性能的shuffle实现以及在数千个节点上运行的能力。
这种模式的主要缺点是延迟时间长,因为在Spark中启动任务DAG是有开销的。然而,几秒的延迟在运行多步计算的大型集群上是可以实现的。
6.3 连续执行模式
这种执行模式的关键是选择声明性的API,不绑定到Structured Streaming的执行策略。例如,最早的Spark Streaming API有一些基于处理时间的操作泄露了微批的概念,这使其难以自动程序到另一种类型的引擎。相反,Structured Streaming的API和语义独立于之执行引擎:连续执行类似于更多的trigger。注意,与纯粹基于非同步的消息传递系统不同,如Storm,我们保留了trigger和epoch的概念在这个模型里,以便多个节点的输出可以协调并一起提交到sink。
因为API支持细粒度的执行,所以Structured Streaming的作业理论上可以运行在任何分布式的流引擎上。在连续处理引擎中,我们在Spark建立了一个简单的连续操作引擎,并且可以重用Spark的基础调度引擎和每个节点的操作符(代码生成操作)。Spark 2.3.0中的第一个版本只支持类似map的任务(没有shuffle操作),这是用户最常见的场景,但是后续的设计将会加入shuffle操作。
相比于批处理引擎,持续处理有两点不同:
(1)master节点在输入源的每个partition上启动一个long-running任务,但是启动多个epoch。如果其中一个任务失败了,Spark会重启它。
(2)epoch的协调是不同的。Master节点定期告诉node启动一个新的epoch,并接收每个输入partition上的一个开始offset,并将其写入WAL中。当要求node启动下一个epoch时,Master节点会接收到上一个epoch的结束offset,并将其写入WAL,当写入了所有结束offset后,会告诉节点提交这个epoch。
八.生产用例
8.1 信息安全平台
这个平台最大的挑战在:
(1)构建一个健壮且伸缩性强的流管道
(2)给分析人员提供一个高效的环境去查询新老数据。
使用AWS上提供的标准工具和服务,一个20人的团队花了6个多月的时间来构建和部署此平台的最初版本。这个最初版本有很多的限制,比如只能存储一小部分历史数据由于使用传统的数据仓库。相比之下,一个五人的工程师团队能够在两周内使用Structured Streaming重构这个平台。这个新平台支持更好的扩展性,且能够支持更复杂的分析,这是因为可以使用Spark ML API。接下来,我们提供一些例子来说明Structured Streaming的优点使这些成为可能。
首先,Structured Streaming能够自适应批的规模,使得开发人员可以构建一个能够处理大量工作的流管道,同时还能满足故障和代码升级。考虑一个流作业,它可能因为失败而离线,或者进行一次升级。当集群恢复上线时,它会开始自动处理离线时未处理的数据。最初,集群将使用大量的批处理去最大化吞吐量。一旦赶上,集群会切换为低延迟的小批量进行处理。这允许管理员定期升级集群,无需担心过度停机。
第二,Structured Streaming可以与其他流进行join操作,与历史表也可以,这样大大简化了分析。考虑一个简单的任务,识别哪个设备是来源于TCP连接的。事实证明,这项任务是具有挑战性的,因为移动设备的存在,因为这些设备的IP地址在每次它们加入网络时都是动态的。因此,只依靠TCP日志,不可能跟踪终端的连接。使用Structured Streaming,分析人员能够简单的解决这个问题。她可以简单的将TCP日志与DHCP日志进行join,将IP地址和MAC地址映射起来,然后使用组织内部的数据网络设备映射到MAC地址特定的机器和用户。另外,用户也可以即时的使用stateful operator进行join操作。
最后,使用相同的系统开发流、交互式查询和ETL为开发人员提供了快速迭代的能力,以及部署新的警报。特别的,它使得分析师能够构建和测试对检测脱机数据供给的查询,然后将这些查询部署在报警集群上。在一个例子中,一个分析师通过DNS开发了一个查询识别攻击。在这次攻击中,恶意软件通过将此信息装载到DNS中泄露机密信息,从而危及主机发送到攻击者拥有的外部DNS服务器的请求。一个用于检测这种攻击的简化查询实际上计算了在一定时间间隔内每个主机发送的DNS请求的总大小。如果聚合大于给定的阈值,则查询标记对应的主机可能受到危害。分析师利用历史数据来设置这个阈值,从而达到平衡假正率和假负率之间的期望平衡。一旦满足了结果,分析人员会简单地将此查询推到报警集群中去。
九.性能评价
9.1 性能 vs 其他流系统
9.2 可伸缩性
9.3 连续处理
结论
欢迎点赞+收藏+转发朋友圈素质三连
文章不错?点个【在看】吧! 👇